#!/usr/bin/python3

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import time

import parent
import testvm
from testlib import *


@nondestructive
@skipDistroPackage()
class TestServices(MachineCase):

    def setUp(self):
        super().setUp()

        # Make sure the system finishes "booting" so that
        # when we add additional services to this target below
        # we don't race with the boot process
        self.machine.execute("systemctl start default.target")

        # We manipulate with `/etc/systemd/system` so `daemon-reload` to keep it in proper state
        # Also `reset-failed` as failed units can be still listed with `systemctl` even when all files
        # were removed and daemon reloaded.
        self.restore_dir("/etc/systemd/system", "systemctl daemon-reload && systemctl reset-failed", True)
        user_post_restore = "su - admin -c 'export XDG_RUNTIME_DIR=/run/user/$(id -u admin); systemctl --user daemon-reload && systemctl --user reset-failed'"
        self.restore_dir("/etc/systemd/user", user_post_restore, True)
        self.restore_dir("/etc/xdg/systemd/user", user_post_restore, True)

    def pick_tab(self, n):
        self.browser.click(f'#services-filter li:nth-child({n}) a')

    def wait_onoff(self, val):
        self.browser.wait_visible(".service-top-panel .pf-c-switch__input" + (":checked" if val else ":not(:checked)"))

    def toggle_onoff(self):
        self.browser.click(".service-top-panel .pf-c-switch__input")

    def do_action(self, action):
        self.browser.click(".service-top-panel .pf-c-dropdown button")
        self.browser.click(f".service-top-panel .pf-c-dropdown__menu a:contains('{action}')")

    def check_service_details(self, statuses, actions, enabled, onoff=True, kebab=True):
        self.browser.wait_collected_text("#statuses .status", "".join(statuses))
        if onoff:
            self.wait_onoff(enabled)
        else:
            self.browser.wait_not_present(".service-top-panel .pf-c-switch")

        if kebab:
            self.browser.click(".service-top-panel .pf-c-dropdown button")
            self.browser.wait_text(".service-top-panel .pf-c-dropdown__menu", "".join(actions))
            # Click away to close the pf-c-dropdown__menu
            self.browser.click(".service-top-panel .pf-c-dropdown button")
        else:
            self.browser.wait_not_present(".service-top-panel .pf-c-dropdown")

    def svc_sel(self, service):
        return f'tr[data-goto-unit="{service}"]'

    def goto_service(self, service):
        return self.browser.click(self.svc_sel(service) + ' a')

    def wait_service_state(self, service, state):
        state_new = state
        if 'inactive' in state:
            state_new = 'Not running'
        elif 'active' in state:
            state_new = 'Running'
        elif 'failed' in state:
            state_new = 'Failed to'

        self.browser.wait_in_text(self.svc_sel(service), state_new)

    def select_file_state(self, states):
        if not isinstance(states, list):
            states = [states]

        self.browser.click("#services-dropdown-file-state")
        for state in states:
            self.browser.set_checked(f".pf-c-select__menu label:contains('{state}') input", True)
        self.browser.click("#services-dropdown-file-state")
        for state in states:
            self.browser.wait_visible(f".pf-c-chip-group__label:contains('File state') + .pf-c-chip-group__list li:contains('{state}')")

    def select_active_state(self, states):
        if not isinstance(states, list):
            states = [states]

        self.browser.click("#services-dropdown-active-state")
        for state in states:
            self.browser.set_checked(f".pf-c-select__menu label:contains('{state}') input", True)
        self.browser.click("#services-dropdown-active-state")
        for state in states:
            self.browser.wait_visible(f".pf-c-chip-group__label:contains('Active state') + .pf-c-chip-group__list li:contains('{state}')")

    def wait_page_load(self):
        self.browser.wait_not_present(".pf-c-empty-state .pf-c-spinner[aria-valuetext='Loading...']")

    def wait_service_in_panel(self, service, title):
        self.wait_service_state(service, title)

    def wait_service_present(self, service):
        if service.endswith(".service"):
            service_name = service[:-8]
        self.browser.wait_text(f'tr[data-goto-unit="{service}"] .service-unit-id', service_name)

    def wait_service_not_present(self, service):
        self.browser.wait_not_present(self.svc_sel(service))

    def make_test_service(self, path="/etc/systemd/system/"):
        self.write_file(f"{path}/test.service",
                        """
[Unit]
Description=Test Service

[Service]
ExecStart=/bin/sh /usr/local/bin/test-service
ExecReload=/bin/sh true

[Install]
WantedBy=default.target
""")
        self.write_file("/usr/local/bin/test-service",
                        """
#! /bin/sh

trap "echo STOP" 0

echo START
while true; do
  sleep 5
  echo WORKING
done
""")
        # After writing files out tell systemd about them
        self.machine.execute("systemctl daemon-reload")
        self.machine.execute("su - admin -c 'export XDG_RUNTIME_DIR=/run/user/$(id -u admin); systemctl --user daemon-reload || true'")

        # When the test fails while `test.service` is active, the process keeps running and
        # `systemct status test.service` still shows it as active until this process dies
        self.addCleanup(self.machine.execute, "for op in stop reset-failed disable; do systemctl $op test test-fail || true; done")
        self.addCleanup(self.machine.execute, "su - admin -c 'export XDG_RUNTIME_DIR=/run/user/$(id -u admin); for op in stop reset-failed disable; do systemctl --user $op test test-fail || true; done'")

    def testBasic(self):
        self._testBasic()

    def testBasicSession(self):
        self._testBasic(True)

    def _testBasic(self, user=False):
        m = self.machine
        b = self.browser

        path = "/etc/systemd/user" if user else "/etc/systemd/system"

        self.write_file(f"{path}/test-fail.service",
                        """
[Unit]
Description=Failing Test Service

[Service]
ExecStart=/usr/bin/false

[Install]
WantedBy=default.target
""")
        self.write_file(f"{path}/test.timer",
                        """
[Unit]
Description=Test Timer

[Timer]
OnCalendar=*:1/2
""")
        self.write_file(f"{path}/test-onboot.timer",
                        """
[Unit]
Description=Test OnBoot Timer

[Timer]
OnBootSec=200min
Unit=test.service
""")

        params = "--user" if user else ""

        def run_cmd(cmd):
            if user:
                m.execute(f"su - admin -c 'export XDG_RUNTIME_DIR=/run/user/$(id -u admin); {cmd}'")
            else:
                m.execute(cmd)

        self.make_test_service(path)
        self.machine.execute(f"systemctl {params} start default.target")

        url = "/system/services#/?owner=user" if user else "/system/services"
        self.login_and_go(url, user="admin")
        self.wait_page_load()

        self.wait_service_present("test.service")
        self.wait_service_in_panel("test.service", "Disabled")
        self.wait_service_state("test.service", "inactive")

        run_cmd(f"systemctl {params} start test.service")
        self.wait_service_state("test.service", "active")
        self.wait_service_in_panel("test.service", "Disabled")

        run_cmd(f"systemctl {params} stop test.service")
        self.wait_service_state("test.service", "inactive")
        self.wait_service_in_panel("test.service", "Disabled")

        run_cmd(f"systemctl {params} enable test.service")
        self.wait_service_state("test.service", "inactive")
        self.wait_service_in_panel("test.service", "Enabled")

        run_cmd(f"systemctl {params} start test.service")

        b.wait_attr("#services-toolbar", "data-loading", "false")

        # Selects Targets tab
        self.pick_tab(2)
        b.wait_not_in_text("#services-list", "test")
        self.wait_service_state("basic.target", "active")

        if not user:
            # Make sure that symlinked services also appear in the list
            self.wait_service_state("reboot.target", "inactive")
            self.wait_service_state("ctrl-alt-del.target", "inactive")
            self.goto_service("ctrl-alt-del.target")
            b.wait_in_text(".service-name", "Reboot")
            b.click("#services-page .pf-c-breadcrumb__link")

        # Selects Sockets tab
        self.pick_tab(3)
        b.wait_not_in_text("#services-list", "test")
        b.wait_not_in_text("#services-list", ".target")
        self.wait_service_state("dbus.socket", "active (running)")

        # Selects Timer tab
        self.pick_tab(4)
        b.wait_not_in_text("#services-list", "test-fail")
        b.wait_not_in_text("#services-list", ".target")
        b.wait_not_in_text("#services-list", ".socket")
        b.wait_visible(self.svc_sel('test.timer'))
        b.wait_text(self.svc_sel('test.timer') + ' .service-unit-triggers', '')
        today = b.eval_js("Intl.DateTimeFormat('en', { dateStyle: 'medium' }).format()")
        run_cmd(f"systemctl {params} start test.timer")
        b.wait_in_text(self.svc_sel('test.timer') + ' .service-unit-next-trigger', today)  # next run
        b.wait_in_text(self.svc_sel('test.timer') + ' .service-unit-last-trigger', "unknown")  # last trigger

        # Timer details should also show information about next and last trigger
        self.goto_service("test.timer")
        b.wait_in_text('.service-unit-next-trigger', today)  # next run
        b.wait_in_text('.service-unit-last-trigger', "unknown")  # last trigger
        b.click(".pf-c-breadcrumb a:contains('Services')")

        run_cmd(f"systemctl {params} stop test.timer")

        b.wait_visible(self.svc_sel('test-onboot.timer'))
        b.wait_text(self.svc_sel('test-onboot.timer') + ' .service-unit-triggers', '')
        run_cmd(f"systemctl {params} start test-onboot.timer")
        # Check the next run. Since it triggers 200mins after the boot, it might be today or tomorrow (after 20:40)
        today_stamp = int(m.execute("date +%s").strip())
        today_plus_200min = m.execute(f"date --date=@{today_stamp + 200 * 60} '+%b %-d, %Y'").strip()
        b.wait_in_text(self.svc_sel('test-onboot.timer') + ' .service-unit-next-trigger', today_plus_200min)
        b.wait_in_text(self.svc_sel('test-onboot.timer') + ' .service-unit-last-trigger', "unknown")  # last trigger
        run_cmd(f"systemctl {params} stop test-onboot.timer")

        # Selects Paths tab - the test VM does not have any user path units
        if not user:
            self.pick_tab(5)
            b.wait_not_in_text("#services-list", "test")
            b.wait_not_in_text("#services-list", ".target")
            b.wait_not_in_text("#services-list", ".socket")
            b.wait_not_in_text("#services-list", ".timer")
            b.wait_visible(self.svc_sel("systemd-ask-password-console.path"))

        # Selects Services tab
        self.pick_tab(1)

        # Survives a burst of events
        suffix = ''
        if user:
            suffix = "?owner=user"
        b.go(f"#/{suffix}")
        self.wait_service_present("test.service")
        m.execute("udevadm trigger; udevadm settle")
        self.goto_service("test.service")
        self.check_service_details(["Running", "Automatically starts"], ["Reload", "Restart", "Stop", "Disallow running (mask)"], True)

        # Stop and disable and back again
        self.toggle_onoff()
        self.check_service_details(["Disabled"], ["Start", "Disallow running (mask)"], False)
        self.toggle_onoff()
        self.check_service_details(["Running", "Automatically starts"], ["Reload", "Restart", "Stop", "Disallow running (mask)"], True)

        testUrl = f'/system/services#/test.service{suffix}'
        # Check without permissions
        b.relogin(testUrl, superuser=False)
        self.wait_page_load()

        b.relogin(testUrl, superuser=True)
        self.wait_page_load()
        self.toggle_onoff()  # later on we need some disabled test
        self.check_service_details(["Disabled"], ["Start", "Disallow running (mask)"], False)

        # Check service that fails to start
        b.go(f'/system/services#/{suffix}')
        self.goto_service("test-fail.service")
        self.check_service_details(["Disabled"], ["Start", "Disallow running (mask)"], False)
        self.toggle_onoff()
        self.check_service_details(["Failed to start", "Automatically starts"], ["Start", "Clear 'Failed to start'", "Disallow running (mask)"], True)
        if not user:
            b.assert_pixels("#service-details", "details-test-fail", ignore=[".cockpit-log-panel .pf-c-card__body", ".pf-c-switch__toggle"])
        b.click(".action-button:contains('Start service')")
        b.go(f'/system/services#/{suffix}')
        self.wait_service_present("test-fail.service")
        self.wait_service_state("test-fail.service", "failed")

        # Check static service
        self.goto_service("systemd-exit.service")
        self.check_service_details(["Static", "Not running"], ["Start", "Disallow running (mask)"], True, onoff=False)

        # Mask and unmask
        self.do_action("Disallow running (mask)")
        b.click("#mask-service button.pf-m-danger")
        self.check_service_details(["Masked"], ["Allow running (unmask)"], False, onoff=False)

        # Masked services have no relationships and therefore the expandable section should not be present
        b.wait_not_present("#service-details-show-relationships")

        self.do_action("Allow running (unmask)")
        self.check_service_details(["Static", "Not running"], ["Start", "Disallow running (mask)"], True, onoff=False)

        def init_filter_state():
            if not b.is_visible("#services-text-filter"):
                b.click(".pf-c-toolbar__toggle button")

            if b.is_present(".pf-c-toolbar__expandable-content.pf-m-expanded button:contains('Clear all filters')"):
                b.click(".pf-c-toolbar__expandable-content.pf-m-expanded button:contains('Clear all filters')")
            elif b.is_present(".pf-c-toolbar__content > .pf-c-toolbar__item > button:contains('Clear all filters')"):
                b.click(".pf-c-toolbar__content > .pf-c-toolbar__item > button:contains('Clear all filters')")
            else:
                b.set_input_text("#services-text-filter input", "")

            self.wait_service_present("test.service")
            self.wait_service_present("test-fail.service")

        b.go(f'/system/services#/{suffix}')

        # Check that `Listen` and `Triggers` properties are shown for socket units
        self.pick_tab(3)
        self.goto_service("dbus.socket")
        if not user:
            self.assertIn("/run/dbus/system_bus_socket (Stream)", b.text("#listen .pf-c-description-list__text"))
        else:
            self.assertIn("/run/user/", b.text("#listen .pf-c-description-list__text"))
        self.assertIn(b.text("#Triggers .pf-c-description-list__text"), ["dbus.service", "dbus-broker.service"])
        b.click(".pf-c-breadcrumb a:contains('Services')")

        # Select services tab
        self.pick_tab(1)

        # Filter by id
        init_filter_state()
        b.set_input_text("#services-text-filter input", "fail.ser")
        self.wait_service_not_present("test.service")
        self.wait_service_present("test-fail.service")

        # Filter by description capital letters included
        init_filter_state()
        b.set_input_text("#services-text-filter input", "Test Service")
        self.wait_service_present("test.service")
        self.wait_service_present("test-fail.service")
        if b.cdp.mobile:
            # close the expanded toolbar again to see the whole list
            b.click("#services-page .pf-c-toolbar__toggle button")
            b.wait_not_visible("#services-page .pf-m-search-filter")
            # scroll the menu navbar all the way to the left
            nav_scroll_btn = ".services-header button[aria-label='Scroll left']"
            while b.call_js_func("ph_attr", nav_scroll_btn, "disabled") is None:
                b.click(nav_scroll_btn)
                time.sleep(0.5)
        if not user:
            b.assert_pixels("#services-page", "text-filter-test")

        # Filter by description capitalization not matching the unit description
        init_filter_state()
        b.set_input_text("#services-text-filter input", "failing test service")
        self.wait_service_not_present("test.service")
        self.wait_service_present("test-fail.service")

        # Filter by Id capitalization not matching the unit Id
        # NetworkManager exists only as a system service, skip user test
        if not user:
            init_filter_state()
            b.set_input_text("#services-text-filter input", "networkmanager")
            self.wait_service_present("NetworkManager.service")

        # Select only static services
        init_filter_state()
        self.select_file_state("Static")
        self.wait_service_not_present("test.service")
        self.wait_service_not_present("test-fail.service")
        self.wait_service_present("systemd-exit.service")

        # Select only disabled services
        init_filter_state()
        self.select_file_state("Disabled")
        self.wait_service_present("test.service")
        self.wait_service_not_present("test-fail.service")
        self.wait_service_not_present("systemd-exit.service")

        # Select only stopped services
        init_filter_state()
        self.select_active_state("Not running")
        self.wait_service_present("test.service")
        self.wait_service_not_present("test-fail.service")

        # Select only failed services
        init_filter_state()
        self.select_active_state("Failed to start")
        self.wait_service_not_present("test.service")
        self.wait_service_present("test-fail.service")

        # Select Alias and Masked services
        # No indirect user services exist, skip user test
        if not user:
            init_filter_state()
            self.select_file_state(["Indirect", "Masked"])
            self.wait_service_not_present("test.service")
            self.wait_service_present("getty@tty1.service")

        # Check filtering and selecting together - empty state
        init_filter_state()
        b.set_input_text("#services-text-filter input", "failing")
        self.select_active_state("Not running")
        self.wait_service_not_present("test.service")
        b.wait_visible("#services-page .pf-c-empty-state")

        # Check resetting filter
        b.click("#clear-all-filters")
        self.wait_service_present("test.service")
        self.wait_service_present("test-fail.service")
        self.wait_service_present("systemd-exit.service")
        self.assertEqual(b.val("#services-text-filter input"), "")

        # Check that closing filter chip groups or single chips works
        init_filter_state()
        self.select_active_state("Not running")
        self.select_active_state("Running")
        self.wait_service_present("test.service")
        b.click(".pf-c-chip-group__label:contains('Active state') + .pf-c-chip-group__list li:contains('Not running') button")
        b.wait_not_present(".pf-c-chip-group__label:contains('Active state') + .pf-c-chip-group__list li:contains('Not running')")
        self.wait_service_not_present("test.service")
        b.click(".pf-c-chip-group:contains('Active state') .pf-c-chip-group__close > button")
        b.wait_not_present(".pf-c-chip-group")

        # Check that journalbox shows empty state
        self.goto_service("systemd-exit.service")
        if not user:
            b.wait_text('.cockpit-log-panel .pf-c-card__body', "No log entries")
        else:
            b.wait_not_present('.cockpit-log-panel')
        b.wait_not_present("button:contains('View all logs')")

        b.go(f'/system/services#/{suffix}')

        self.write_file(f"/tmp/mem-hog{params}.awk", f'BEGIN {{ system("while [ ! -f /tmp/continue{params} ]; do sleep 1; done"); y = sprintf("%50000000s",""); system("sleep infinity") }}')
        self.write_file(f"{path}/mem-hog.service",
                        f"""
[Unit]
Description=Mem Hog Service

[Service]
ExecStart=/bin/awk -f /tmp/mem-hog{params}.awk
""")

        # After writing files out tell systemd about them
        run_cmd(f"systemctl {params} daemon-reload")

        # Check that MemoryCurrent is shown
        self.goto_service("mem-hog.service")
        b.wait_not_present("#memory")
        # In some distros systemctl is showing memory current as [Not Set] for user units
        if not user or m.image not in ["rhel-8-5", "rhel-8-6", "ubuntu-2004", "centos-8-stream"]:
            run_cmd(f"systemctl {params} start mem-hog.service")
            # If the test fails before we stop the mem-hog the next test run will not get the correct memory usage here
            self.addCleanup(run_cmd, f"systemctl {params} stop mem-hog.service || true")
            initial_memory = float(b.text("#memory .pf-c-description-list__text").split(" ")[0])
            self.assertGreater(initial_memory, 0.5)
            run_cmd(f"touch /tmp/continue{params}")
            self.addCleanup(m.execute, f"rm /tmp/continue{params}")
            # MemoryCurrent auto-updates every 30s - default timeout is 60s - when it updates the memory used should be ~50MiB
            b.wait(lambda: float(b.text("#memory .pf-c-description-list__text").split(" ")[0]) > 40)
            run_cmd(f"systemctl {params} stop mem-hog")
            b.wait_not_present("#memory")
        # Check that listen is not shown for .service units
        b.wait_not_present("#listen")

    def testLogs(self):
        b = self.browser

        self.make_test_service()

        self.login_and_go("/system/services")
        self.wait_page_load()

        # Check test.service and then start it
        self.goto_service("test.service")
        self.check_service_details(["Disabled"], ["Start", "Disallow running (mask)"], False)
        self.toggle_onoff()
        self.check_service_details(["Running", "Automatically starts"], ["Reload", "Restart", "Stop", "Disallow running (mask)"], True)
        b.wait_in_text('.cockpit-log-panel .pf-c-card__body', "START")
        b.wait_in_text('.cockpit-log-panel .pf-c-card__body', "WORKING")
        b.click(".cockpit-log-panel .pf-c-card__header button")

        b.enter_page("/system/logs")
        b.wait_in_text("#journal-box .cockpit-log-panel > div:nth-child(2)", "WORKING")
        t = b.text("#journal-box .cockpit-log-panel > div:nth-child(3)") + b.text("#journal-box .cockpit-log-panel > div:nth-child(4)")
        self.assertIn("START", t)
        if b.cdp.mobile:
            b.click("button[aria-label='Show Filters']")
        b.wait_val("#journal-grep input", "priority:debug service:test.service ")
        b.go("/system/services#test.service")
        b.enter_page("/system/services")

        b.wait_in_text(".cockpit-logline:nth-child(2)", "WORKING")
        b.click(".cockpit-logline:nth-child(2)")
        b.enter_page("/system/logs")
        b.wait_text(".pf-c-card__title", "WORKING")
        b.click(".pf-c-breadcrumb a:contains('Logs')")
        if b.cdp.mobile:
            b.click("button[aria-label='Show Filters']")
        b.wait_val("#journal-grep input", "priority:debug service:test.service ")
        b.go("/system/services#test.service")
        b.enter_page("/system/services")

        b.wait_in_text(".cockpit-logline:nth-child(2)", "WORKING")
        b.click(".cockpit-logline:nth-child(2)")
        b.enter_page("/system/logs")
        b.click("button:contains('Go to test.service')")
        b.enter_page("/system/services")
        b.wait_js_cond('window.location.hash === "#/test.service"')

    def testApi(self):
        b = self.browser

        self.make_test_service()

        self.login_and_go("/playground/service#/test")

        b.wait_text('#exists', 'true')
        b.wait_text('#state', '"stopped"')
        b.wait_text('#enabled', 'false')

        b.click('#start')
        b.wait_text('#state', '"running"')
        b.click('#stop')
        b.wait_text('#state', '"stopped"')

        b.click('#enable')
        b.wait_text('#enabled', 'true')
        b.click('#disable')
        b.wait_text('#enabled', 'false')

        b.go('#/foo')
        b.wait_text('#exists', 'false')

    def testConditions(self):
        m = self.machine
        b = self.browser
        self.write_file("/etc/systemd/system/condtest.service",
                        """
[Unit]
Description=Test Service
ConditionDirectoryNotEmpty=/var/tmp/empty

[Service]
ExecStart=/bin/sh -c "while true; do sleep 5; done"

[Install]
WantedBy=multi-user.target
""")

        m.execute("mkdir -p /var/tmp/empty")
        m.execute("rm -rf /var/tmp/empty/*")
        self.addCleanup(m.execute, "systemctl stop condtest")

        # After writing files out tell systemd about them
        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start multi-user.target")

        # This does not work for not enabled services. See:
        # https://github.com/systemd/systemd/issues/2234
        self.machine.execute("systemctl enable condtest")

        self.login_and_go("/system/services")

        # Selects Services tab
        self.pick_tab(1)

        self.wait_service_in_panel("condtest.service", "Enabled")
        self.goto_service("condtest.service")
        self.check_service_details(["Not running", "Automatically starts"], ["Start", "Disallow running (mask)"], True)
        self.do_action("Start")
        b.wait_text("#condition", "Condition ConditionDirectoryNotEmpty=/var/tmp/empty was not met")

        # If the condition succeeds the message disappears
        m.execute("touch /var/tmp/empty/non-empty")
        self.addCleanup(m.execute, "rm /var/tmp/empty/non-empty")
        self.do_action("Start")
        self.check_service_details(["Running", "Automatically starts"], ["Restart", "Stop", "Disallow running (mask)"], True)
        b.wait_not_present("#condition")

    def testRelationships(self):
        m = self.machine
        b = self.browser

        self.write_file("/etc/systemd/system/test-a.service",
                        """
[Unit]
Description=A Service
Before=test-b.service
Conflicts=test-c.service

[Service]
ExecStart=/bin/sh -c "while true; do sleep 5; done"
""")

        self.write_file("/etc/systemd/system/test-b.service",
                        """
[Unit]
Description=B Service
After=test-a.service

[Service]
ExecStart=/bin/sh -c "while true; do sleep 5; done"
""")

        self.write_file("/etc/systemd/system/test-c.service",
                        """
[Unit]
Description=C Service
Conflicts=test-a.service
PartOf=test-b.service

[Service]
ExecStart=/bin/sh -c "while true; do sleep 5; done"
""")

        # After writing files out tell systemd about them
        m.execute("systemctl daemon-reload")

        def rel_sel(reltype, service):
            return f"#{''.join(reltype.split(' '))} a:contains('{service}')"

        # services page
        self.login_and_go("/system/services")
        self.wait_page_load()
        self.wait_service_present("test-a.service")
        self.goto_service("test-a.service")

        # service a
        b.wait_js_cond('window.location.hash === "#/test-a.service"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("Before", "test-b.service"))
        b.wait_visible(rel_sel("Conflicts", "test-c.service"))
        b.click(rel_sel("Before", "test-b.service"))

        # service b
        b.wait_js_cond('window.location.hash === "#/test-b.service"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("After", "test-a.service"))
        b.click(rel_sel("After", "test-a.service"))

        # service a
        b.wait_js_cond('window.location.hash === "#/test-a.service"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("Conflicts", "test-c.service"))
        b.click(rel_sel("Conflicts", "test-c.service"))

        # service c
        b.wait_js_cond('window.location.hash === "#/test-c.service"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("Conflicts", "test-a.service"))
        b.wait_visible(rel_sel("Part of", "test-b.service"))
        b.click(rel_sel("Part of", "test-b.service"))

        # service b
        b.wait_js_cond('window.location.hash === "#/test-b.service"')
        b.click("#service-details-show-relationships button")
        b.wait_visible(rel_sel("After", "test-a.service"))

    def testNotFound(self):
        m = self.machine
        b = self.browser

        self.write_file("/etc/systemd/system/test.service",
                        """
[Unit]
Description=Test Service
Requires=not-found.service

[Service]
ExecStart=/bin/true
""")
        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start default.target")

        self.login_and_go("/system/services")
        self.wait_page_load()

        b.wait_visible(self.svc_sel("test.service"))
        b.wait_not_present(self.svc_sel("not-found.service"))

        self.goto_service("test.service")
        b.wait_js_cond('window.location.hash === "#/test.service"')

        b.click("#service-details-show-relationships button")
        b.wait_visible("#Requires a.pf-m-disabled:contains('not-found.service')")

    def testResetFailed(self):
        m = self.machine
        b = self.browser

        # Put it in /run instead of in /etc so that we can mask it,
        # which needs to put a symlink into /etc.
        self.write_file("/run/systemd/system/test-fail.service",
                        """
[Unit]
Description=Failing Test Service

[Service]
ExecStart=/usr/bin/false

[Install]
WantedBy=default.target
""")

        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start default.target")

        self.login_and_go("/system/services")
        self.wait_page_load()

        self.goto_service("test-fail.service")
        b.wait_js_cond('window.location.hash === "#/test-fail.service"')

        self.check_service_details(["Disabled"], ["Start", "Disallow running (mask)"], False)

        self.wait_onoff(False)
        self.toggle_onoff()

        self.check_service_details(["Failed to start", "Automatically starts"],
                                   ["Start", "Clear 'Failed to start'", "Disallow running (mask)"], True)

        # Disabling should reset the "Failed to start" status
        self.wait_onoff(True)
        self.toggle_onoff()

        self.check_service_details(["Disabled"], ["Start", "Disallow running (mask)"], False)

        self.wait_onoff(False)
        self.toggle_onoff()

        self.check_service_details(["Failed to start", "Automatically starts"],
                                   ["Start", "Clear 'Failed to start'", "Disallow running (mask)"], True)

        # Just reset the failed status, but leave it enabled
        self.do_action("Clear")

        self.check_service_details(["Not running", "Automatically starts"],
                                   ["Start", "Disallow running (mask)"], True)

        self.do_action("Start")
        self.check_service_details(["Failed to start", "Automatically starts"],
                                   ["Start", "Clear 'Failed to start'", "Disallow running (mask)"], True)

        # Masking should also clear the failed status
        self.do_action("Disallow running (mask)")
        b.click("#mask-service button.pf-m-danger")
        self.check_service_details(["Masked"], ["Allow running (unmask)"], False, onoff=False)

    def count_failures(self):
        return int(self.machine.execute("systemctl --failed --plain --no-legend | wc -l"))

    def check_system_menu_services_error(self, expected):
        b = self.browser
        b.switch_to_top()
        if b.cdp.mobile:
            b.click("#nav-system-item")
        if expected:
            b.wait_visible("#services-error")
        else:
            b.wait_not_present("#services-error")
        if b.cdp.mobile:
            # close menu again
            b.click("#nav-system-item")

    def testNotifyFailed(self):
        m = self.machine
        b = self.browser

        # We use two services with prefix "aaa" and "aab" so that
        # they always occupy the first two rows and we can check their
        # relative order depending on whether "aab" has failed or not.

        self.write_file("/etc/systemd/system/aaa.service",
                        """
[Unit]
Description=First Row Service

[Service]
ExecStart=/usr/bin/true
""")
        self.write_file("/etc/systemd/system/aab-fail.service",
                        """
[Unit]
Description=Failing Test Service

[Service]
ExecStart=/usr/bin/false
""")

        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start default.target")
        self.machine.execute("systemctl reset-failed")

        self.login_and_go("/system/services")
        self.wait_page_load()

        # Start test-fail
        m.execute("systemctl start aab-fail")

        # Services tab should have icon
        # Aab-fail should be at the top, and have failed
        b.wait_visible('a:contains("Services") .ct-exclamation-circle')
        b.wait_visible('tr[data-goto-unit="aab-fail.service"]')
        b.wait_visible('tr[data-goto-unit="aab-fail.service"] .service-unit-status:contains("Failed")')

        # Nav link should have icon
        self.check_system_menu_services_error(True)

        n_failures = self.count_failures()
        health_message = "1 service has failed" if n_failures == 1 else f"{n_failures} services have failed"

        # System page should have notification
        b.click_system_menu("/system")
        b.wait_in_text(".system-health-events", health_message)

        # Clear aab-fail
        m.execute("systemctl reset-failed aab-fail")

        # aab-fail should not be at the top
        b.switch_to_top()
        b.click_system_menu("/system/services")
        b.wait_not_present('li:nth-child(1)[data-goto-unit="aab-fail.service"]')

        if self.count_failures() == 0:
            # Services tab should not have icon
            b.wait_not_present('a:contains("Services") .ct-exclamation-circle')

            # Nav link should not have icon
            self.check_system_menu_services_error(False)

            # System page should not have notification
            b.click_system_menu('/system')
            b.wait_not_in_text(".system-health-events", "service has failed")
            b.wait_not_in_text(".system-health-events", "services have failed")

    def testHiddenFailure(self):
        m = self.machine
        b = self.browser

        self.write_file("/etc/systemd/system/fail.mount",
                        """
[Unit]
Description=Failing Mount

[Mount]
What=wrong
Where=/fail
""")

        m.execute("systemctl daemon-reload")
        self.machine.execute("systemctl start default.target")
        self.machine.execute("systemctl reset-failed")
        m.execute("systemctl start fail.mount || true")

        self.login_and_go("/system/services")
        self.wait_page_load()

        if self.count_failures() == 1:
            # Nav link should not have icon
            self.check_system_menu_services_error(False)

            # System page should not have notification
            b.click_system_menu("/system")
            b.wait_not_in_text(".system-health-events", "service has failed")
            b.wait_not_in_text(".system-health-events", "services have failed")

        self.allow_journal_messages(".*type=1400 audit(.*): avc:  denied  { create } .* comm=\"systemd\" name=\"fail\".*")

    def testTransientFailingUnits(self):
        self.login_and_go("/system/services")
        self.wait_page_load()

        self.machine.execute("systemd-run --collect --unit test-autocollect@1.service sh -c 'sleep 10; false'")
        self.machine.execute("systemd-run --unit test-manual-collect@1.service sh -c 'sleep 3; false'")

        self.wait_service_present("test-autocollect@1.service")
        self.wait_service_present("test-manual-collect@1.service")

        self.wait_service_in_panel("test-manual-collect@1.service", "Failed to start")

        self.wait_service_not_present("test-autocollect@1.service")
        self.wait_service_present("test-manual-collect@1.service")

        self.machine.execute("systemctl reset-failed")
        self.wait_service_not_present("test-manual-collect@1.service")


@skipDistroPackage()
class TestTimers(MachineCase):
    def svc_sel(self, service):
        return f'tr[data-goto-unit="{service}"]'

    def wait_page_load(self):
        self.browser.wait_not_present(".pf-c-empty-state .pf-c-spinner[aria-valuetext='Loading...']")

    def testCreate(self):
        m = self.machine
        b = self.browser

        def wait_systemctl_timer(time):
            with testvm.Timeout(seconds=20, machine=m, error_message="Timeout while waiting for systemctl list-timers"):
                m.execute(f"cmd='systemctl list-timers'; until $cmd | grep -m 1 '{time}'; do sleep 1; done")

        # HACK: pmie and pmlogger take a looooong time to shutdown (https://bugzilla.redhat.com/show_bug.cgi?id=1703348)
        # so disable them for this test, we don't test PCP here
        m.execute("systemctl disable --now pmie pmlogger || true")

        # set an initial baseline date/time, to ensure that we never jump backwards in subsequent tests
        m.execute("timedatectl set-timezone UTC")
        m.execute("""ntp=`timedatectl show --property NTP --value`
                     if [ $ntp == "yes" ]; then
                         timedatectl set-ntp off
                     fi""")
        wait(lambda: "false" in m.execute(
            "busctl get-property org.freedesktop.timedate1 /org/freedesktop/timedate1 org.freedesktop.timedate1 NTP"))
        m.execute("timedatectl set-time '2036-01-01 12:00:00'")
        m.spawn("sync && sync && sync && sleep 0.1 && reboot", "reboot")
        m.wait_reboot()

        m.execute("timedatectl set-time '2036-01-01 15:30:00'")
        self.login_and_go("/system/services")
        self.wait_page_load()
        # Select "Timers" tab
        self.browser.click('#services-filter li:nth-child(4) a')
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "testing timer")
        m.execute("rm -f /tmp/date")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.set_input_text(".create-timer-time-picker input", "24:6s")
        b.click("#timer-save-button")

        # checks for invalid input messages
        b.wait_text("#servicename-helper", "Only alphabets, numbers, : , _ , . , @ , - are allowed")
        b.wait_text("#description-helper", "This field cannot be empty")
        b.wait_text(".pf-c-date-picker__helper-text", "Invalid time format")

        # creates a new yearly timer at 2036-01-01 16:00 and at 2037-01-01 01:22
        b.set_input_text("#servicename", "yearly_timer")
        b.set_input_text("#description", "Yearly timer")
        b.select_from_dropdown("#drop-repeat", "yearly")
        b.click("[data-index=0] [aria-label=Add]")
        b.set_input_text("[data-index=0] .pf-c-date-picker:nth-child(1) input", "2036-01-01")
        b.set_input_text("[data-index=0] .pf-c-date-picker:nth-child(2) input", "16:00")
        b.set_input_text("[data-index=1] .pf-c-date-picker:nth-child(1) input", "2037-01-01")
        b.set_input_text("[data-index=1] .pf-c-date-picker:nth-child(2) input", "01:22")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('yearly_timer.timer'))

        m.execute("timedatectl set-time '2036-01-01 15:30:00'")
        b.wait_visible(self.svc_sel('yearly_timer.timer'))
        b.wait_in_text(self.svc_sel('yearly_timer.timer'), "Yearly timer")
        wait_systemctl_timer("2036-01-01 16:00")
        self.assertIn("2036-01-01 16:00", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-01-01 16:10:00'")
        # checks if yearly timer repeats yearly on 2037-01-01 01:22
        wait_systemctl_timer("2037-01-01 01:22")
        self.assertIn("2037-01-01 01:22", m.execute("systemctl list-timers"))
        # creates a new monthly timer that runs on 6th at 14:12 and 8th at 21:12 of every month
        b.wait_visible("#create-timer")
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "monthly_timer")
        b.set_input_text("#description", "Monthly timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.select_from_dropdown("#drop-repeat", "monthly")
        b.select_from_dropdown("[data-index=0] .month-days", "6")
        b.set_input_text("[data-index=0] .create-timer-time-picker input", "14:12")
        b.click("[data-index=0] [aria-label=Add]")
        b.select_from_dropdown("[data-index=1] .month-days", "8")
        b.set_input_text("[data-index=1] .create-timer-time-picker input", "21:12")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('monthly_timer.timer'))

        m.execute("timedatectl set-time '2036-01-01 16:15:00'")
        b.wait_visible(self.svc_sel('monthly_timer.timer'))
        b.wait_in_text(self.svc_sel('monthly_timer.timer'), "Monthly timer")
        wait_systemctl_timer("2036-01-06 14:12")
        self.assertIn("2036-01-06 14:12", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-01-07 00:00:00'")
        wait_systemctl_timer("2036-01-08 21:12")
        self.assertIn("2036-01-08 21:12", m.execute("systemctl list-timers"))
        # checks if timer runs on next month February 2036 on same dates
        m.execute("timedatectl set-time '2036-01-08 21:23'")
        wait_systemctl_timer("2036-02-06 14:12")
        self.assertIn("2036-02-06 14:12", m.execute("systemctl list-timers"))
        # checks if timer runs on 8th March 2036 at 21:12
        m.execute("timedatectl set-time '2036-03-07 00:00:00'")
        wait_systemctl_timer("2036-03-08 21:12")
        self.assertIn("2036-03-08 21:12", m.execute("systemctl list-timers"))
        # creates a new weekly timer that runs on Fri at 12:45 and Sun at 20:12 every week
        b.wait_visible("#create-timer")
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "weekly_timer")
        b.set_input_text("#description", "Weekly timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.select_from_dropdown("#drop-repeat", "weekly")
        b.wait_visible("[data-index=0] .week-days")
        b.click("[data-index=0] [aria-label=Add]")
        b.select_from_dropdown("[data-index=0] .week-days", "fri")
        b.set_input_text("[data-index=0] .create-timer-time-picker input", "12:45")
        b.select_from_dropdown("[data-index=1] .week-days", "sun")
        b.set_input_text("[data-index=1] .create-timer-time-picker input", "20:12")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('weekly_timer.timer'))

        m.execute("timedatectl set-time '2036-03-08 00:00:00'")
        b.wait_visible(self.svc_sel('weekly_timer.timer'))
        b.wait_in_text(self.svc_sel('weekly_timer.timer'), "Weekly timer")
        wait_systemctl_timer("Sun 2036-03-09 20:12")
        self.assertIn("Sun 2036-03-09 20:12", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-03-10 00:00:00'")
        wait_systemctl_timer("Fri 2036-03-14 12:45")
        self.assertIn("Fri 2036-03-14 12:45", m.execute("systemctl list-timers"))
        # checks if timer runs on next week's Friday and Sunday
        m.execute("timedatectl set-time '2036-03-15 00:00:00'")
        wait_systemctl_timer("Sun 2036-03-16 20:12")
        self.assertIn("Sun 2036-03-16 20:12", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-03-17 00:00:00'")
        wait_systemctl_timer("Fri 2036-03-21 12:45")
        self.assertIn("Fri 2036-03-21 12:45", m.execute("systemctl list-timers"))
        # creates a new daily timer that runs at 2:40 and at 21:15 every day
        b.wait_visible("#create-timer")
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "daily_timer")
        b.set_input_text("#description", "Daily timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.select_from_dropdown("#drop-repeat", "daily")
        b.click("[data-index=0] [aria-label=Add]")
        b.set_input_text("[data-index=0] .create-timer-time-picker input", "2:40")
        b.set_input_text("[data-index=1] .create-timer-time-picker input", "21:15")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('daily_timer.timer'))

        m.execute("timedatectl set-time '2036-03-17 00:00:00'")
        b.wait_visible(self.svc_sel('daily_timer.timer'))
        b.wait_in_text(self.svc_sel('daily_timer.timer'), "Daily timer")
        wait_systemctl_timer("2036-03-17 02:40")
        self.assertIn("2036-03-17 02:40", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-03-17 03:00:00'")
        wait_systemctl_timer("2036-03-17 21:15")
        self.assertIn("2036-03-17 21:15", m.execute("systemctl list-timers"))
        # checks if timer runs on 2036-04-10 at 02:40 and 21:15
        m.execute("timedatectl set-time '2036-04-10 00:00:00'")
        wait_systemctl_timer("2036-04-10 02:40")
        self.assertIn("2036-04-10 02:40", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-04-10 03:00:00'")
        wait_systemctl_timer("2036-04-10 21:15")
        self.assertIn("2036-04-10 21:15", m.execute("systemctl list-timers"))
        # creates a new houry timer that runs at *:05 and at *:26
        b.wait_visible("#create-timer")
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "hourly_timer")
        b.set_input_text("#description", "Hourly timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.select_from_dropdown("#drop-repeat", "hourly")
        b.click("[data-index=0] [aria-label=Add]")
        b.set_input_text("[data-index=0] input", "05")
        b.set_input_text("[data-index=1] input", "26")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('hourly_timer.timer'))

        m.execute("timedatectl set-time '2036-04-10 03:00:00'")
        b.wait_visible(self.svc_sel('hourly_timer.timer'))
        b.wait_in_text(self.svc_sel('hourly_timer.timer'), "Hourly timer")
        wait_systemctl_timer("2036-04-10 03:05")
        self.assertIn("2036-04-10 03:05", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-04-10 03:07:00'")
        wait_systemctl_timer("2036-04-10 03:26")
        self.assertIn("2036-04-10 03:26", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-04-10 04:00:00'")
        wait_systemctl_timer("2036-04-10 04:05")
        # checks if timer runs on next hour at 5 min and 26 min
        self.assertIn("2036-04-10 04:05", m.execute("systemctl list-timers"))
        m.execute("timedatectl set-time '2036-04-10 04:10:00'")
        wait_systemctl_timer("2036-04-10 04:26")
        self.assertIn("2036-04-10 04:26", m.execute("systemctl list-timers"))
        # creates a new timer that runs at today at 23:59
        b.wait_visible("#create-timer")
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "no_repeat_timer")
        b.set_input_text("#description", "No repeat timer")
        b.set_input_text("#command", "/bin/sh -c '/bin/date >> /tmp/date'")
        b.click("input[value=specific-time]")
        b.set_input_text(".create-timer-time-picker input", "23:59")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('no_repeat_timer.timer'))
        b.wait_in_text(self.svc_sel('no_repeat_timer.timer'), "No repeat timer")

        m.execute("timedatectl set-time '2036-04-10 04:10:00'")
        b.wait_visible(self.svc_sel('no_repeat_timer.timer'))
        wait_systemctl_timer("2036-04-10 23:59")
        self.assertIn("2036-04-10 23:59", m.execute("systemctl list-timers"))

        # creates a boot timer that runs after 10 sec from boot
        b.wait_visible("#create-timer")
        b.click('#create-timer')
        b.wait_visible("#timer-dialog")
        b.set_input_text("#servicename", "boot_timer")
        b.set_input_text("#description", "Boot timer")
        b.set_input_text("#command", "/bin/sh -c 'echo hello >> /tmp/hello'")
        b.click("input[value=system-boot]")
        b.set_input_text(".delay-group input", "2")
        b.click("#timer-save-button")
        b.wait_not_present("#timer-dialog")
        b.wait_visible(self.svc_sel('boot_timer.timer'))
        m.spawn("sync && sync && sync && sleep 0.1 && reboot", "reboot")
        m.wait_reboot()
        m.start_cockpit()
        with testvm.Timeout(seconds=15, machine=m, error_message="Timeout while waiting for boot timer to run"):
            m.execute("while [ ! -f /tmp/hello ] ; do sleep 0.5; done")
        self.assertIn("hello", m.execute("cat /tmp/hello"))

        self.allow_restart_journal_messages()


if __name__ == '__main__':
    test_main()
